Dubbo Feature - 01. 线程模型

Overview

了解本部分请先参考: Dubbo User Book - 6.4 线程模型 · GitBook

相关代码位于 dubbo-remoting-api

Call stack

如果你曾经在Provider端的服务实现类中打过断点, 会发现调用是从ChannelEventRunnable.run()开始的, 这是因为默认情况下, Provider端的业务调用都是在一个单独的线程池中执行的, 即WrappedChannelHandler中的executor.

从IO线程中获取的数据分发到业务线程池, 这一功能在Dubbo被抽象成了Dispatcher:

1
ChannelHandler dispatch(ChannelHandler handler, URL url);

根据此类上的@SPI(AllDispatcher.NAME)也可知默认为AllDispatcher. 实际的逻辑封装在AllDispatcher中返回的AllChannelHandler. 在其中的received方法中打断点, 调用栈如下:

1
2
3
4
NettyServer.received(Channel, Object)
-> MultiMessageHandler.received(Channel, Object)
-> HeartbeatHandler.received(Channel, Object)
-> AllChannelHandler.received(Channel, Object)

由此Provider端调用栈即可形成一个闭环: 从Netty -> 具体的业务实现.

线程模型 - Dispatcher

除了DirectDispatcher之外, 每一种配置都对应了一种ChannelHandler的实现类. 他们都是WrappedChannelHandler的子类.

ChannelHandler在Dubbo中很重要, 还有很多相关的子类暂时不需要关注. 我们先来看一下: DirectDispatcherAllDispatcher

  • DirectDispatcher最简单, 它直接返回了传入的ChannelHandler, 这个类即为DecodeHandler. 联系上面的调用栈可知下一步调用即为DecodeHandler#received
  • AllDispatcher覆盖了WrappedChannelHandler的几乎所有方法, 都是实例化一个ChannelEventRunnable, 交给executor执行. 在ChannelEventRunnablerun方法中再执行DecodeHandler的后续操作.

如果了解过Dubbo的其他代码, 可以发现这又是常见的装饰模式. 四个类对后续的ChannelHandler(实际上是DecodeHandler)进行装饰, 实现内容实际上比较简单.

这里有个值得注意的问题是, execution 这一策略, 并没有实现. 具体实现跟文档描述并不一致. 可以参考dispatcher:execution · Issue #1089 · apache/incubator-dubbo · GitHub 以及 fixes #1089, make ExecutionDispatcher meet dubbo-user-book by qct · Pull Request #1449 · apache/incubator-dubbo · GitHub

线程池配置

线程池的实例化在WrappedChannelHandler中:

1
executor = (ExecutorService) ExtensionLoader.getExtensionLoader(ThreadPool.class).getAdaptiveExtension().getExecutor(url);

默认情况下使用FixedThreadPool, 位于dubbo-commons

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class FixedThreadPool implements ThreadPool {

public Executor getExecutor(URL url) {
String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
int threads = url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS);
int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
return new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS,
queues == 0 ? new SynchronousQueue<Runnable>() :
(queues < 0 ? new LinkedBlockingQueue<Runnable>()
: new LinkedBlockingQueue<Runnable>(queues)),
new NamedThreadFactory(name, true), new AbortPolicyWithReport(name, url));
}

}

可见默认情况下, 创建一个大小为200的线程池; 使用队列SynchronousQueue, 即可看作没有使用队列缓存元素. 并在饱和时抛出异常, 同时jstack日志到文件(AbortPolicyWithReport) .

你或许需要使用MessageOnlyDispatcher:

如果Dubbo线程池占满时, 很可能发现调用端获得的是超时异常.

注意上文提到的WrappedChannelHandler中有一个caught方法, 当发生异常时(暂时不需要了解此处的调用关系)会被执行. 参考下方See Also第一篇, 使用AllChannelHandler策略时, caught方法由于线程池满也被拒绝, 导致无法返回, 调用端只能等到Timeout.

另外一个问题是, AllDispatcher中如果判断executor为空或者已经shutdown, 会去拿SHARED_EXECUTOR. 这种场景并没有想到何时回出现, 希望知道的朋友指点.

Call Stack 上游

1
2
3
4
protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {
return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class)
.getAdaptiveExtension().dispatch(handler, url))); // 此处断点. handler就是上文提到的DecodeHandler
}

省略了部分调用:

1
2
3
4
5
6
7
8
ServiceBean.doExport()
-> DubboProtocol.export()
-> DubboProtocol.createServer
-> HeaderExchangers.bind
-> NettyTransporter.bind
-> NettyServer.<init>
-> ChannelHandlers.wrap
-> ChannelHandlers.wrapInternal

See Also